package com.smile.cloud.admin.rocketmq;

import com.smile.cloud.admin.rocketmq.core.DedupConcurrentListener;
import com.smile.cloud.admin.rocketmq.core.DedupConfig;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;

import java.nio.charset.StandardCharsets;

@Slf4j
public class SampleListener extends DedupConcurrentListener {

    public SampleListener(DedupConfig dedupConfig) {
        super(dedupConfig);
    }

    /**
     * 基于什么做消息去重，每一类不同的消息都可以不一样，做去重之前会尊重此方法返回的值
     *
     * @param messageExt
     * @return
     */
//    @Override
//    protected String dedupMessageKey(MessageExt messageExt) {
//        //为了简单示意，这里直接使用消息体作为去重键
//        if ("MyTopic".equals(messageExt.getTopic())) {
//            return new String(messageExt.getBody());
//        } else {//其他使用默认的配置（消息id）
//            return super.dedupMessageKey(messageExt);
//        }
//
//    }
    @Override
    protected boolean doHandleMsg(MessageExt messageExt) {
        if ("MyTopic".equals(messageExt.getTopic())) {
            String topic = messageExt.getTopic();
            String tags = messageExt.getTags();
            String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
            log.info("receive MQ消息 topic={}, tags={}, msg={},messageExt={}", topic, tags, body, messageExt);
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return true;
    }

}
